package com.atguigu.gmall.realtime.app.dwd.db;

import com.atguigu.gmall.realtime.utils.MyKafkaUtil;
import com.atguigu.gmall.realtime.utils.MySqlUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


import java.time.ZoneId;


/**
 * ClassName: Dwd05_TradePayDetailSuc
 * Package: com.atguigu.gmall.realtime.app.dwd.db
 * Description:
 *            交易域支付成功事务事实表
 *             从 Kafka topic_db主题筛选支付成功数据、从dwd_trade_order_detail主题中读取订单事实数据、MySQL-LookUp字典表，关联三张表形成支付成功宽表，写入 Kafka 支付成功主题。
 * @Author ChenJun(有志男青年)
 * @Create 2023/5/5 18:51
 * @Version 1.0
 */

//数据流:web/app -> 业务服务器(Mysql) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> Kafka(DWD)
//程 序:Mock -> Mysql -> Maxwell -> Kafka(ZK) -> Dwd03_TradeOrderDetail -> Kafka(ZK) -> Dwd05_TradePayDetailSuc -> Kafka(ZK)
public class Dwd05_TradePayDetailSuc {
    public static void main(String[] args) throws Exception {

        // TODO 1. 基本环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.getConfig().setLocalTimeZone(ZoneId.of("GMT+8"));

        // TODO 2. 状态后端设置
//        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
//        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
//        env.getCheckpointConfig().enableExternalizedCheckpoints(
//                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
//        );
//        env.setRestartStrategy(RestartStrategies.failureRateRestart(
//                3, Time.days(1), Time.minutes(1)
//        ));
//        env.setStateBackend(new HashMapStateBackend());
//        env.getCheckpointConfig().setCheckpointStorage(
//                "hdfs://hadoop102:8020/ck"
//        );
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        // TODO 3. 读取 Kafka dwd_trade_order_detail 主题数据，封装为 Flink SQL 表
        tableEnv.executeSql("" +
                "create table dwd_trade_order_detail(\n" +
                "id string,\n" +
                "order_id string,\n" +
                "user_id string,\n" +
                "sku_id string,\n" +
                "sku_name string,\n" +
                "province_id string,\n" +
                "activity_id string,\n" +
                "activity_rule_id string,\n" +
                "coupon_id string,\n" +
                "date_id string,\n" +
                "create_time string,\n" +
                "source_id string,\n" +
                "source_type string,\n" +
                "order_status string,\n" +
                "order_status_name string,\n" +
                "sku_num string,\n" +
                "split_original_amount string,\n" +
                "split_activity_amount string,\n" +
                "split_coupon_amount string,\n" +
                "split_total_amount string,\n" +
                "ts string,\n" +
                "row_time as TO_TIMESTAMP(FROM_UNIXTIME(cast(ts as bigint))),\n" +
                "watermark for row_time as row_time" +
                ")" + MyKafkaUtil.getKafkaSourceConnOption("dwd_trade_order_detail", "dwd_trade_pay_detail_suc"));

        // TODO 4. 从 Kafka 读取业务数据，封装为 Flink SQL 表
        tableEnv.executeSql("create table topic_db(" +
                "`database` String,\n" +
                "`table` String,\n" +
                "`type` String,\n" +
                "`data` map<String, String>,\n" +
                "`old` map<String, String>,\n" +
                "`proc_time` as PROCTIME(),\n" +
                "`ts` string,\n" +
                "row_time as TO_TIMESTAMP(FROM_UNIXTIME(cast(ts as bigint))),\n" +
                "watermark for row_time as row_time" +
                ")" + MyKafkaUtil.getKafkaSourceConnOption("topic_db", "dwd_trade_pay_detail_suc"));

        // TODO 5. 筛选支付成功数据
        Table paymentInfo = tableEnv.sqlQuery("select\n" +
                "data['user_id'] user_id,\n" +
                "data['order_id'] order_id,\n" +
                "data['payment_type'] payment_type,\n" +
                "data['callback_time'] callback_time,\n" +
                "row_time,\n" +
                "`proc_time`,\n" +
                "ts\n" +
                "from topic_db\n" +
                "where `table` = 'payment_info'\n" +
                "and data['callback_time'] is not null"
        );
        tableEnv.createTemporaryView("payment_info", paymentInfo);

        // TODO 6. 建立 MySQL-LookUp 字典表
        tableEnv.executeSql(MySqlUtil.getMysqlBaseDicDDL());

        // TODO 7. 关联 3 张表获得支付成功宽表
        Table joinTable = tableEnv.sqlQuery("" +
                "select\n" +
                "pi.order_id order_id,\n" +
                "pi.payment_type payment_type_code,\n" +
                "dic.dic_name payment_type_name,\n" +
                "pi.callback_time,\n" +
                "pi.row_time,\n" +
                "pi.ts\n" +
                "from payment_info pi\n" +
                "join `base_dic` for system_time as of pi.proc_time as dic\n" +
                "on pi.payment_type = dic.dic_code");
        tableEnv.createTemporaryView("join_table", joinTable);

        Table resultTable = tableEnv.sqlQuery("" +
                "select\n" +
                "od.id order_detail_id,\n" +
                "od.order_id,\n" +
                "od.user_id,\n" +
                "od.sku_id,\n" +
                "od.sku_name,\n" +
                "od.province_id,\n" +
                "od.activity_id,\n" +
                "od.activity_rule_id,\n" +
                "od.coupon_id,\n" +
                "pi.payment_type_code,\n" +
                "pi.payment_type_name,\n" +
                "pi.callback_time,\n" +
                "od.source_id,\n" +
                "od.source_type source_type_code,\n" +
                "od.order_status,\n" +
                "od.order_status_name,\n" +
                "od.sku_num,\n" +
                "od.split_original_amount,\n" +
                "od.split_activity_amount,\n" +
                "od.split_coupon_amount,\n" +
                "od.split_total_amount split_payment_amount,\n" +
                "pi.ts\n" +
                "from dwd_trade_order_detail od, join_table pi\n" +
                "where od.order_id = pi.order_id\n " +
                "and od.row_time >= pi.row_time - INTERVAL '15' MINUTE \n" +
                "and od.row_time <= pi.row_time + INTERVAL '5' SECOND");
        tableEnv.createTemporaryView("result_table", resultTable);

        // TODO 8. 创建 Kafka dwd_trade_pay_detail_suc 表
        tableEnv.executeSql("create table dwd_trade_pay_detail_suc(\n" +
                "order_detail_id string,\n" +
                "order_id string,\n" +
                "user_id string,\n" +
                "sku_id string,\n" +
                "sku_name string,\n" +
                "province_id string,\n" +
                "activity_id string,\n" +
                "activity_rule_id string,\n" +
                "coupon_id string,\n" +
                "payment_type_code string,\n" +
                "payment_type_name string,\n" +
                "callback_time string,\n" +
                "source_id string,\n" +
                "source_type_code string,\n" +
                "order_status string,\n" +
                "order_status_name string,\n" +
                "sku_num string,\n" +
                "split_original_amount string,\n" +
                "split_activity_amount string,\n" +
                "split_coupon_amount string,\n" +
                "split_payment_amount string,\n" +
                "ts string,\n" +
                "primary key(order_detail_id) not enforced\n" +
                ")" + MyKafkaUtil.getKafkaUpsertSinkConnOption("dwd_trade_pay_detail_suc"));

        // TODO 9. 将关联结果写入 Upsert-Kafka 表
        tableEnv.executeSql("" +
                "insert into dwd_trade_pay_detail_suc select * from result_table");
    }
}
